package cn.rdtimes.wolfdsp.core.invoker;

import cn.rdtimes.wolfdsp.core.conf.Configuration;
import cn.rdtimes.wolfdsp.core.data.BalanceStrategy;
import cn.rdtimes.wolfdsp.core.ha.NameService;
import cn.rdtimes.wolfdsp.core.invoker.http.HttpInvokeObject;
import cn.rdtimes.wolfdsp.core.invoker.http.ResponseMessage;
import cn.rdtimes.wolfdsp.core.service.ScheduleManager;
import cn.rdtimes.wolfdsp.core.util.StringUtil;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.security.SecureRandom;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 缺省实现http调用
 * 使用httpclient组件,暂时不支持tls加密调用
 *
 * @author BZ
 */
class DefaultHttpTaskInvokeHandler
        extends AbstractTaskInvokeHandler<HttpInvokeObject, Map<String, Object>>
        implements TaskHttpInvokeHandler<Map<String, Object>> {

    // 统一使用一个计数器轮询策略
    private final AtomicLong counter = new AtomicLong(0);
    private final Random random = new SecureRandom();

    DefaultHttpTaskInvokeHandler() {
        super(InvokeKind.HTTP, "DefaultHttpTaskInvokeHandler");
    }

    // 只有启动的时候才可以使用微服务名称负载调用,其他的调用都应该是具体的节点ip地址
    // 把实际执行调度的节点ip和端口返回,使用_realNodeIp和_realNodePort为key
    @Override
    public Map<String, Object> invoke(HttpInvokeObject invokeObject) throws Exception {
        if (invokeObject == null) {
            throw new IllegalStateException("invokeObject is error");
        }

        String url;
        String ip;
        int port;
        boolean isTls = false;
        //通过ip地址调用
        if (!StringUtil.isEmpty(invokeObject.getIp())) {
            if (invokeObject.getPort() <= 0) {
                throw new IllegalArgumentException("port is less than 0");
            }
            ip = invokeObject.getIp();
            port = invokeObject.getPort();
            if (invokeObject.isHasTls()) isTls = true;
        } else { //通过微服务名称调用
            if (StringUtil.isEmpty(invokeObject.getServiceName())) {
                throw new IllegalArgumentException("serviceName is null");
            }

            NameService.ServerInfo serverInfo = getServerInfo(invokeObject.getServiceName(),
                    invokeObject.getBalanceStrategy());
            if (serverInfo == null) {
                throw new Exception("get " + invokeObject.getServiceName() + "is null");
            }
            ip = serverInfo.ip;
            port = serverInfo.port;
            if (serverInfo.isTls) isTls = true;
        }
        if (isTls) url = "https://";
        else url = "http://";
        url = url + ip + ":" + port + REQ_START_TASK_URL;

        Configuration conf = ScheduleManager.getInstance().getConfiguration();
        Map<String, Object> requestMap = new HashMap<>();
        requestMap.put("masterIp", conf.getMasterIpAndPort());
        requestMap.put("standbyIp", conf.getStandbyIpAndPort());
        requestMap.put("jobId", invokeObject.getJobId());
        requestMap.put("taskId", invokeObject.getTaskId());
        requestMap.put("jobInstanceId", invokeObject.getJobInstanceId());
        requestMap.put("taskInstanceId", invokeObject.getTaskInstanceId());
        requestMap.put("timestamp", System.currentTimeMillis());
        requestMap.put("inputParams", invokeObject.getInputParam());
        Map<String, Object> ret = sentRequest(url, requestMap);
        ret.put("_retNodeIp", ip);
        ret.put("_realNodePort", port);
        return ret;
    }

    @Override
    public Map<String, Object> invokeStop(HttpInvokeObject invokeObject) throws Exception {
        if (invokeObject == null || StringUtil.isEmpty(invokeObject.getIp()) || invokeObject.getPort() <= 0) {
            throw new IllegalArgumentException("invokeObject is error");
        }
        String url = "http://" + invokeObject.getIp() + ":" + invokeObject.getPort()
                + REQ_STOP_TASK_URL;
        Map<String, Object> requestMap = new HashMap<>();
        requestMap.put("jobId", invokeObject.getJobId());
        requestMap.put("taskId", invokeObject.getTaskId());
        requestMap.put("jobInstanceId", invokeObject.getJobInstanceId());
        requestMap.put("taskInstanceId", invokeObject.getTaskInstanceId());
        requestMap.put("timestamp", System.currentTimeMillis());
        return sentRequest(url, requestMap);
    }

    @Override
    public Map<String, Object> invokeQuery(HttpInvokeObject invokeObject) throws Exception {
        if (invokeObject == null || StringUtil.isEmpty(invokeObject.getIp()) || invokeObject.getPort() <= 0) {
            throw new IllegalArgumentException("invokeObject is error");
        }
        String url = "http://" + invokeObject.getIp() + ":" + invokeObject.getPort()
                + REQ_QUERY_TASK_URL;
        Map<String, Object> requestMap = new HashMap<>();
        requestMap.put("jobId", invokeObject.getJobId());
        requestMap.put("taskId", invokeObject.getTaskId());
        requestMap.put("jobInstanceId", invokeObject.getJobInstanceId());
        requestMap.put("taskInstanceId", invokeObject.getTaskInstanceId());
        requestMap.put("timestamp", System.currentTimeMillis());
        return sentRequest(url, requestMap);
    }

    /**
     * 根据负载策略获取某个微服务地址
     *
     * @param serviceName     微服务名称
     * @param balanceStrategy 0-首个 1-轮询 2-随机
     * @return 微服务地址
     */
    private NameService.ServerInfo getServerInfo(String serviceName, BalanceStrategy balanceStrategy) {
        NameService nameService = ScheduleManager.getInstance().getNameService();
        List<NameService.ServerInfo> serverInfos = nameService.getServerInfo(serviceName);
        if (serverInfos == null || serverInfos.size() == 0) return null;

        NameService.ServerInfo serverInfo;
        int len = serverInfos.size();
        switch (balanceStrategy) {
            case FIRST:
                serverInfo = serverInfos.get(0);
                break;
            case LAST:
                serverInfo = serverInfos.get(len - 1);
                break;
            case RANDOM:
                serverInfo = serverInfos.get(random.nextInt(len - 1));
                break;
            case ROUND_ROBIN:
            default:
                long l = counter.getAndIncrement();
                serverInfo = serverInfos.get((int) (l % len));
        }
        return serverInfo;
    }

    // 需要确认是否可以单件???
    private static final CloseableHttpClient httpClient;
    private static final RequestConfig requestConfig;

    static {
        httpClient = HttpClients.createDefault();
        requestConfig = RequestConfig.custom().setSocketTimeout(30_000).setConnectTimeout(10_000).
                setConnectionRequestTimeout(10_000).setMaxRedirects(3).build();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                try {
                    httpClient.close();
                } catch (Exception e) {
                    // ignore
                }
            }
        });
    }

    /**
     * 发送请求,Post方法
     *
     * @param url   url
     * @param input 输入数据
     * @return 返回错误是抛出异常
     * @throws Exception 失败抛出异常
     */
    private Map<String, Object> sentRequest(String url, Map<String, Object> input) throws Exception {
        HttpPost httpPost = new HttpPost(url);
        httpPost.setConfig(requestConfig);
        StringEntity entity = new StringEntity(JSON.toJSONString(input), ContentType.APPLICATION_JSON);
        httpPost.setEntity(entity);
        CloseableHttpResponse response = httpClient.execute(httpPost);
        int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode >= HttpStatus.SC_OK && statusCode < HttpStatus.SC_MULTIPLE_CHOICES) {
            String respEntity = EntityUtils.toString(response.getEntity());
            Map<String, Object> resp = (Map<String, Object>) JSON.parse(respEntity);
            Object code = resp.get("code");
            if (code == null || (int) code != ResponseMessage.SUCC) {
                throw new Exception("response code[" + code + "]");
            }
            return (Map<String, Object>) resp.get("data");
        }
        throw new Exception(response.toString());
    }

}
